[improve](streaming-job) async chunk splitting for StreamingInsertJob#63079
[improve](streaming-job) async chunk splitting for StreamingInsertJob#63079JNSimba wants to merge 13 commits into
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR moves StreamingInsertJob (CDC FROM-TO and cdc_stream TVF) snapshot chunk splitting from a synchronous CREATE STREAMING JOB path to an incremental, scheduler-tick-driven flow. The goal is to avoid long blocking CREATE times and BRPC timeouts on large / skewed PK tables by fetching snapshot splits in small batches and persisting progress for recovery.
Changes:
- Adds split-progress APIs to
SourceOffsetProviderand implements an async split state machine inJdbcSourceOffsetProvider(plus new FE tests). - Introduces
FetchTableSplitsRequestfields to drive stateless, resumable split generation (nextSplitStart/nextSplitId/batchSize) and rebuilds cdc_client split fetching around flink-cdcChunkSplitter. - Persists per-table chunk lists incrementally via
StreamingJobUtils.upsertChunkList, and advances splits each scheduler tick (including a pre-advance in PENDING).
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Reworks /api/fetchSplits handling to drive flink-cdc ChunkSplitter directly (stateless batch split generation). |
| fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/SplitProgressTest.java | Unit tests for SplitProgress default state and deep-copy semantics. |
| fe/fe-core/src/test/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProviderAsyncSplitTest.java | Unit tests covering async split advancement, dedup, noMoreSplits, and committed-progress advancement. |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Adds per-table chunk_list UPSERT support with id reuse / allocation. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java | Adds default split-progress hooks (initSplitProgress, advanceSplits, noMoreSplits). |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java | Removes create-time pre-splitting; re-init split progress on replay; relies on scheduler-driven split fetching. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Implements async split progress, scheduler-driven split fetching, persistence to system table, and restart replay logic. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java | Calls advanceSplitsIfNeed() each tick and pre-advances once in PENDING before dispatch. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Persists syncTables, initializes split progress on CREATE, and adds advanceSplitsIfNeed() that pauses job on failure. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java | Adds nextSplitStart, nextSplitId, and batchSize fields to support resumable batched split fetching. |
Comments suppressed due to low confidence (1)
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcTvfSourceOffsetProvider.java:320
- replayIfNeed() comment still says snapshot splits in the meta table are "written by initOnCreate", but initOnCreate() is now an intentional no-op and meta writes come from scheduler-driven advanceSplits()/upsertChunkList. Updating this comment will avoid confusion when debugging TVF recovery behavior.
// Re-init transient split progress fields lost across FE restart.
// syncTables itself is persisted on StreamingInsertJob; cdcSplitProgress is rebuilt empty
// here and advanceSplits will resume from the system table on next tick.
if (cdcSplitProgress == null) {
initSplitProgress(job.getSyncTables());
}
if (currentOffset == null) {
// No committed txn yet. If snapshot splits exist in the meta table (written by
// initOnCreate), restore remainingSplits so getNextOffset() returns snapshot splits
// instead of a BinlogSplit (which would incorrectly skip the snapshot phase).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
/review |
There was a problem hiding this comment.
I found one additional blocking issue beyond the existing review threads.
Critical checkpoint conclusions:
- Goal/test coverage: the PR moves CDC chunk splitting from CREATE-time blocking to scheduler-driven async fetching and adds focused unit tests, but the TVF FE-restart path is not covered and can stop fetching the remaining chunks.
- Scope: the change is focused on async split progress, though it touches both FROM-TO and TVF paths.
- Concurrency: existing threads already cover the broad splitsLock/RPC concern; I did not add a duplicate. The new issue is lifecycle/replay state, not a new lock-order finding.
- Lifecycle/replay: blocking issue found for cdc_stream TVF restart because transient cachedSyncTables is not restored.
- Configuration/compatibility: no new config or wire-incompatible persistent format issue found beyond the added optional request fields.
- Parallel paths: FROM-TO replay initializes split progress, but the TVF override does not do the equivalent unconditionally.
- Testing: unit tests cover the base provider state machine, but miss TVF replay after FE restart. No end-to-end regression result is included in this PR.
- Observability/performance/transactions: no additional distinct issue found beyond existing review threads.
- User focus: no additional user-provided review focus was supplied.
|
run buildall |
|
/review |
There was a problem hiding this comment.
Reviewed the full PR diff and existing review threads. I found two additional correctness issues that are not duplicates of the existing comments.
Critical checkpoint conclusions:
- Goal/test: The PR moves CDC snapshot split generation to scheduler-driven batches and adds tests for the base async state machine, but the cdc_stream TVF path still has an untested concurrent update path and the split-key type cache lacks collision coverage.
- Scope/focus: The implementation is mostly focused, but it now routes both non-TVF and TVF CDC through shared async splitting state, which exposes subclass state-management gaps.
- Concurrency: Blocking issue found.
JdbcTvfSourceOffsetProvider.updateOffset()mutates inherited split lists/maps withoutsplitsLockwhile scheduler-drivenadvanceSplits()and inherited readers use that lock. - Lifecycle/restart: Existing review threads already cover TVF restart/cache restoration concerns; I did not duplicate them.
- Config: New CDC RPC/batch configs are present with descriptions and are read dynamically.
- Compatibility/protocol: FE now sends
batchSizeand resume fields; existing threads already cover composite-boundary contract concerns. - Parallel paths: MySQL and generic JDBC split fetching were both updated; the cache-key collision issue affects the generic JDBC path and should be considered for MySQL if chunk key/database identity can vary.
- Tests: Added tests cover base provider state, but no TVF concurrency/restart continuation test or cache-collision test covers the issues below.
- Observability: Logs were added around split fetch and progress; no additional blocker found.
- Persistence/transactions: Snapshot progress is persisted through the meta table/EditLog or txn attachment depending on path; existing unresolved threads cover restart truncation risks.
- Performance: Existing review threads already cover long lock holds and repeated sorting; I did not duplicate them.
User focus: no additional user-provided review focus was supplied.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
… fix unevenly-cut resume
|
/review |
There was a problem hiding this comment.
Automated review result: request changes. I did not add new inline comments because the blocking findings I confirmed are already covered by existing review threads and should not be duplicated.
Critical checkpoint conclusions:
- Goal/test proof: The PR aims to move CDC snapshot split generation to scheduler-driven batches and add resume/restart coverage. Tests were added, but current code still has unresolved correctness and concurrency concerns already raised in inline threads, including snapshot truncation/races and split-fetch batching behavior.
- Scope/focus: The feature is focused, but the change is broad across FE scheduling, persisted split metadata, CDC client split generation, and TVF recovery; several edge paths still need tightening before merge.
- Concurrency: Not OK. Existing threads cover split-state access/locking issues and long blocking RPC/internal SQL work while holding splitsLock. These can delay task commits/scheduling or race with TVF updateOffset paths.
- Lifecycle/restart: Not OK. Existing TVF restart comments cover mid-snapshot replay/resume hazards where transient split progress/table cache reconstruction can truncate snapshot progress after restored chunks are consumed.
- Configuration: A new dynamic FE batch-size config is read at split-fetch time, which is appropriate, but existing comments note the batch contract/enforcement is still incomplete in CDC client paths.
- Compatibility/storage format: No new external storage format incompatibility found beyond the new persisted split-progress fields, but replay behavior must be correct across FE restart before this is safe.
- Parallel paths: Not fully OK. MySQL, generic JDBC/Postgres, non-TVF, and cdc_stream TVF paths all need consistent batch limits, startup-mode handling, and resume semantics; existing threads identify gaps.
- Conditional checks/error handling: Some existing concerns remain around splitId parsing, composite split boundaries, SQL literal construction, and error clarity.
- Test coverage/results: Regression and unit tests were added, but coverage does not yet prove the unresolved edge cases from existing threads are fixed; no tests were run by this review.
- Observability: Logging was added for split fetch/progress, sufficient for basic diagnosis, but correctness blockers remain.
- Transaction/persistence/data correctness: Not OK until the existing replay, upsert, and split-progress concerns are resolved; these can affect snapshot completeness/duplicates after restart or retries.
- FE/BE variable passing: The new batchSize field is passed from FE now, but existing comments show downstream enforcement still needs work.
- Performance: Existing comments cover heavy work under splitsLock and repeated full-list sorting/materialization; these are relevant for large split counts.
User focus: No additional user-provided review focus was supplied.
Existing inline review threads should be addressed rather than duplicated here, especially the split-state concurrency/locking, TVF restart recovery, SQL upsert safety, batch-size enforcement, split-key/composite-boundary handling, and split metadata determinism issues.
…plits and sink resolveSplitKeyClass to PG reader
TPC-H: Total hot run time: 29471 ms |
TPC-DS: Total hot run time: 170192 ms |
…re table id, add TVF mid-snapshot restart case
…splitter is genuinely mid-flight at restart
|
run buildall |
…uneven restart-fe case
|
run buildall |
|
/review |
There was a problem hiding this comment.
I cannot approve this PR yet. I did not add duplicate inline comments because the existing review threads already cover the blocking issues I found, but those blockers still affect the PR's correctness.
Critical checkpoint conclusions:
- Goal/test coverage: The PR aims to make CDC snapshot split generation asynchronous and resumable; tests were added for async split, pause/resume, restart, special offset, and TVF paths, but the existing threads identify uncovered failure cases where resume/split progress can still truncate or loop.
- Scope/focus: The implementation is focused on streaming CDC split state, but it changes FE scheduling, durable split metadata, FE-BE request contracts, cdc_client split generation, and regression tests together, which increases integration risk.
- Concurrency: This path is concurrent between scheduler ticks, task dispatch, and commit/updateOffset paths. Some lock coverage was improved, but existing review threads still identify long RPC/meta-table operations and split-state races that need resolution.
- Lifecycle/restart: Restart recovery is central to this change. Existing threads already point out restart cases where transient split progress/table cache and qualified-vs-bare table IDs can prevent fetching the next batch after restored chunks are consumed.
- Config: The new streaming_cdc_fetch_splits_batch_size is mutable and passed to FE-BE fetchSplits; however existing comments note batch size is not consistently enforced by cdc_client split generation.
- Compatibility/protocol: FetchTableSplitsRequest adds cursor and batch fields. Existing comments cover the single-column boundary assumption and splitId format dependency risks.
- Parallel paths: MySQL, generic JDBC/Postgres, and TVF/non-TVF paths are all touched; existing comments identify issues present in both MySQL and generic JDBC paths.
- Special conditions: Empty split batches, malformed split IDs, non-initial startup modes, and table-name normalization are all special cases; existing inline threads already cover unresolved risks in these areas.
- Persistence/transactionality: The streaming_job_meta chunk_list write is part of durable split state. Existing threads identify unsafe SQL construction/id allocation and durable-state divergence risks.
- Data correctness: Because snapshot-to-binlog handoff and restart replay depend on complete split definitions and high-watermarks, the existing persistence/resume issues can cause re-cut, skipped chunks, or snapshot truncation.
- Performance: Existing threads cover full-list re-sort/re-materialization and batch-size overrun risks.
- Observability: Logging was added around split fetching/replay, and no additional observability issue beyond existing comments was found.
- User focus: No additional user-provided review focus was supplied.
Please address the existing inline review threads before this can be considered safe to merge.
|
/review |
There was a problem hiding this comment.
Review completed against the current PR diff and existing inline threads. I found a new blocking correctness issue: after a snapshot batch commits, the job can immediately create another task using the just-committed snapshot offset while async split fetching has not produced the next batch yet, causing duplicate snapshot reads. I also found flaky assertions in the new async split regression tests because they stop a background sampler before synchronously verifying the final chunk_list length.
Critical checkpoint conclusions:
- Goal and tests: the async split goal is partially covered by new tests, but the task scheduling handoff between committed snapshot batches and the next async fetch still has a correctness gap; some tests are nondeterministic.
- Scope/focus: the main implementation is focused on async split fetching and restart support, but the scheduler/task interaction needs one more guard.
- Concurrency: split collections are mostly protected by splitsLock in the modified paths; the remaining issue is lifecycle ordering between task success and scheduler-driven advanceSplits, not a raw collection race.
- Lifecycle/restart: restart paths were reviewed with the existing comments in mind; no additional distinct restart issue found beyond the scheduling gap and already-known threads.
- Configuration: the new batch-size config is wired into FE fetchSplits requests; no additional config issue found.
- Compatibility: no new FE/BE protocol compatibility issue found in the current PR file list.
- Parallel paths: MySQL, generic JDBC/Postgres, and TVF paths were checked; the sampler-race test pattern appears in multiple new uneven async split tests.
- Error handling/observability: no additional distinct error-handling issue found beyond existing review threads.
- Persistence/transactions: meta persistence concerns are already covered by existing threads; no additional distinct persistence issue found.
- Performance: no additional distinct performance issue found beyond existing batch-size and lock-hold threads.
User focus: no additional user-provided review focus was supplied.
TPC-H: Total hot run time: 29585 ms |
TPC-DS: Total hot run time: 171857 ms |
FE UT Coverage ReportIncrement line coverage |
FE UT Coverage ReportIncrement line coverage |
… stabilize uneven sampler assertions
|
run buildall |
|
/review |
|
run buildall |
There was a problem hiding this comment.
I found two additional blocking issues in the async split path. Critical checkpoints: goal is clear (make CDC snapshot splitting scheduler-driven) and partially covered by tests, but TVF restart recovery is still incomplete and can skip remaining snapshot chunks; the change is generally focused, but the new mutable batch-size config lacks validation; concurrency around FE split lists is improved from prior revisions, with remaining restart-state correctness risk noted inline; lifecycle/recovery is involved and still has a TVF cursor restoration bug; a new dynamic config is added but invalid values are not rejected; no storage-format incompatibility found beyond persisted provider fields, which have replay handling for the non-TVF path; parallel FROM-TO and TVF code paths are not fully aligned because TVF replay does not rebuild cdcSplitProgress; tests exist but do not cover the no-committed-txn/partial-meta TVF restart cursor case or invalid batch-size config; observability is adequate for normal RPC failures but not enough to prevent the invalid-config no-progress loop; transaction/persistence correctness is affected for TVF restart as described; no data write/MoW/visible-version BE concerns apply. User focus: no additional user-provided review focus was specified.
| @@ -312,9 +300,12 @@ public void updateOffset(Offset offset) { | |||
| */ | |||
| @Override | |||
| public void replayIfNeed(StreamingInsertJob job) throws JobException { | |||
There was a problem hiding this comment.
Restoring cachedSyncTables alone is not enough for TVF mid-snapshot recovery. This override restores remainingSplits/finishedSplits from streaming_job_meta and committed offsets, but unlike the base JdbcSourceOffsetProvider.replayIfNeed() it never rebuilds cdcSplitProgress from the last restored mid-table split. After FE restart, cdcSplitProgress.currentSplittingTable stays null; because the single TVF table is already present in remainingSplits/finishedSplits, inherited noMoreSplits() returns true and advanceSplitsIfNeed() stops fetching subsequent batches. Once the restored batch is drained, the job can transition to binlog/snapshot completion and skip the rest of the snapshot. Please mirror the base replay cursor restoration (findResumeMidSplit + applySplitToProgress, or equivalent) for the TVF path after restoring split metadata. This is distinct from the earlier cachedSyncTables-null issue because the table cache is now restored, but the async fetch cursor is still missing.
| public static int streaming_cdc_heavy_rpc_timeout_sec = 600; | ||
|
|
||
| @ConfField(mutable = true, masterOnly = true) | ||
| public static int streaming_cdc_fetch_splits_batch_size = 100; |
There was a problem hiding this comment.
This mutable config is used directly as FetchTableSplitsRequest.batchSize, and both cdc_client split loops run only while result.size() < batchSize. If an operator sets this to 0 or a negative value dynamically, cdc_client returns an empty split batch without error; FE then keeps the current table cursor unchanged and will repeatedly see no progress. Please validate/clamp this config to a positive value before sending it (or fail the scheduler tick with a clear error) so an invalid dynamic config cannot wedge async splitting.
TPC-H: Total hot run time: 29840 ms |
TPC-H: Total hot run time: 29722 ms |
TPC-DS: Total hot run time: 169096 ms |
TPC-DS: Total hot run time: 172522 ms |
TPC-H: Total hot run time: 29750 ms |
TPC-DS: Total hot run time: 171441 ms |
Summary
StreamingInsertJob(CDC FROM-TO andcdc_streamTVF paths) used to callsplitChunks()synchronously insideCREATE STREAMING JOB, asking cdc_client to cut every chunk of every table before returning. On large/non-uniform PK tables this can take 30+ minutes — far beyond the BE→cdc_client BRPC 60s timeout, and the SQL client blocks the whole time.This PR makes splitting tick-driven by the FE scheduler:
CREATEreturns immediately; no more synchronoussplitChunks().advanceSplits()issues one short fetchSplits RPC (defaultbatchSize=100) and pushes that batch intoremainingSplits. Tasks dispatch as soon as the first batch lands, so end-to-end first-byte latency stays close to flink-cdc's.ChunkSplitterfrom the(currentSplittingTable, nextSplitStart, nextSplitId)triple supplied by FE; flink-cdc internals are untouched (uses the publicChunkSplitterAPI only).committedSplitProgress(3-fieldSplitProgress) + existingchunkHighWatermarkMap/binlogOffsetPersiststreaming_job_metasystem table holds fullchunk_listJSON per table (UPSERT eachadvanceSplits)SourceOffsetProvider#initSplitProgress/noMoreSplits/advanceSplitsinterface;StreamingJobSchedulerTask.handlePendingStatepre-advances one batch so the first task doesn't wait a fullmax_interval.Detailed design lives in the linked plan.
Changes
fe-common:FetchTableSplitsRequestaddsnextSplitStart(Object[]) /nextSplitId/batchSize.fe-core:SourceOffsetProvideradds 3 default methods:initSplitProgress/advanceSplits/noMoreSplits.JdbcSourceOffsetProviderimplements the async state machine (committed/cdcSplitProgress,advanceSplits, dedup, system-table UPSERT, replay path).JdbcTvfSourceOffsetProvider.initOnCreateno longer pre-splits; relies on the same scheduler tick path.StreamingInsertJobcarriessyncTables(@SerializedName("st"));initSourceJob/initInsertJobinitializeSplitProgress;advanceSplitsIfNeed()mirrorsfetchMetaerror handling (PAUSE on failure).StreamingJobSchedulerTask.handlePendingState/handleRunningStatecalladvanceSplitsIfNeed()each tick; PENDING handler pre-advances and short-circuits if PAUSED.StreamingJobUtils.upsertChunkListcovers id-allocation viaMAX(id)+1lookup.cdc_client/JdbcIncrementalSourceReader:getSourceSplits()rebuilt around the publicChunkSplitterAPI (no more in-memory loop / reflection hack).Tests
SplitProgressTest— copy/null-field semantics.JdbcSourceOffsetProviderAsyncSplitTest— coversadvanceSplits(first call / continue same table / cross-table switch / dedup / empty batch),noMoreSplits,updateOffsetcommitted-progress advancement (mid-chunk vs last chunk vs replay missing-split path), andcomputeCdcRemainingTables.test_streaming_postgres_job_async_split.groovy— 100 rows ×snapshot_split_size=5→ 20 splits across multiple ticks; asserts CREATE returns < 30s, full snapshot count + DISTINCT id, then INSERT/UPDATE/DELETE in binlog phase.Test plan
mvn test -pl fe/fe-core -Dtest=JdbcSourceOffsetProviderAsyncSplitTest,SplitProgressTesttest_streaming_postgres_job_async_splitregression locallyCREATEreturns in seconds,SHOW STREAMING JOBimmediately reflects the new job, snapshot completes, binlog phase healthycdc_streamTVF + StreamingInsertJob path: confirm CREATE no longer blocks